package com.imooc.spark.kafka1;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

/**
 * Created by zghgchao 2017/11/14 10:45
 * Kafka 生产者
 */
public class KafkaProducer extends Thread {

    private String topic;

    private Producer<Integer, String> producer;

    public KafkaProducer(String topic) {
        this.topic = topic;

        Properties properties = new Properties();
        properties.put("metadata.broker.list", KafkaProperties.BROKER_LIST);
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        /*
       * The number of acknowledgments the producer requires the leader to have received before considering a request complete.
       * This controls the durability of the messages sent by the producer.
       *
       * request.required.acks = 0 - means the producer will not wait for any acknowledgement from the leader.//生产者不会等待任何消息的确认，发出去就不管了
       * request.required.acks = 1 - means the leader will write the message to its local log and immediately acknowledge//leader会把消息写到本地log中，并立即返回一个ACK 确认收到消息
       * request.required.acks = -1 - means the leader will wait for acknowledgement from all in-sync replicas before acknowledging the write//在确认写之前，leader会等待所有同步副本的确认
       */
        properties.put("request.required.acks", "1");

        producer = new Producer<Integer, String>(new ProducerConfig(properties));
    }

    //使用Thread的run方法进行 消息的发送
    @Override
    public void run() {
        int messageNo = 1;

        while (true) {
            String message = "message_" + messageNo;
            producer.send(new KeyedMessage<Integer, String>(topic, message));
            System.out.println("Sent: " + message);

            messageNo++;

            try {
                Thread.sleep(2000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
